Skip to content

refactor(providers): extract StreamAbortGuard helper for streaming cancel#146

Merged
ericleepi314 merged 1 commit into
fix/esc-cancel-openai-compatiblefrom
refactor/extract-stream-abort-guard
May 15, 2026
Merged

refactor(providers): extract StreamAbortGuard helper for streaming cancel#146
ericleepi314 merged 1 commit into
fix/esc-cancel-openai-compatiblefrom
refactor/extract-stream-abort-guard

Conversation

@ericleepi314
Copy link
Copy Markdown
Collaborator

Summary

Why

The Critic reviews of both #144 and #145 called out the duplication and recommended a follow-up extraction once the pattern stabilized. User explicitly asked for this in conversation: "yes please implement this follow-up."

What StreamAbortGuard provides

guard = StreamAbortGuard(abort_signal)
guard.raise_if_pre_aborted()  # before API request

with client.messages.stream(...) as stream, guard.attach(stream):
    try:
        for chunk in stream.text_stream:
            if guard.aborted:  # optional in-loop check
                break
            ...
    except Exception as exc:
        guard.reraise_if_aborted(exc)
        raise

guard.raise_if_post_aborted()  # catches abort fired between __exit__ and return

Key invariant: while attached, stream.response is closed on abort. This is enforced by both the listener firing path AND a safety-net close in __exit__ — closes the race window where AbortSignal._fire snapshots the listener list, the consumer thread observes aborted=True, breaks out, and detaches the listener before the snapshot's iteration runs. httpx.Response.close() is idempotent (if not self.is_closed guard), so the double-fire path is harmless.

Line counts

File Lines
anthropic_provider.py -88
minimax_provider.py -73
openai_compatible.py -212
_stream_abort.py (new) +211 (mostly docstring)
Net providers -262

Test plan

  • 17 new helper unit tests pin: pre/post fast-paths, aborted property, attach lifecycle, race-recovery via register-then-recheck, close-failure tolerance, no-response-attribute graceful degradation, reraise_if_aborted translation + cause chaining, __exit__ close-on-abort safety net.
  • Safety-net test mutation-verified: removed the __exit__ close branch and confirmed test_exit_closes_stream_when_signal_aborted_no_listener_fire fails with "Expected 'close' to have been called." Restored, test passes.
  • 15 provider-level tests (Anthropic + Minimax + OpenAI-compat) pass unmodified — behavior preservation across the refactor.
  • 4436 broader related tests pass.
  • Critic subagent review: APPROVE (after applying their Option 3 — pushing close-on-abort into the helper's __exit__ — and adding the race + happy-path tests).

Dependencies

Stacked on #145 (the OpenAI-compat fix). Rebase onto main once #145 merges.

🤖 Generated with Claude Code

…ncel

PRs #144 and #145 added abort-signal-aware streaming for three
providers (Anthropic, Minimax, OpenAI-compatible). The pattern across
all three is structurally identical — pre-call fast-path, register-
then-recheck listener closing the stream's response on abort, signal-
state-authoritative exception translation, listener detachment, post-
stream recheck — but the bookkeeping was inlined and triplicated.
Adding a fourth provider would have meant a fourth copy.

Extract the pattern into ``src/providers/_stream_abort.py``:

* ``StreamAbortGuard(abort_signal)`` — one per call. ``abort_signal=None``
  makes every method a no-op so providers can use it unconditionally.
* ``raise_if_pre_aborted()`` — pre-call fast-path raising ``AbortError``.
* ``raise_if_post_aborted()`` — same check at a different boundary.
* ``aborted`` property — cheap in-loop check (OpenAI-compat path).
* ``reraise_if_aborted(original_exc)`` — translate SDK exceptions to
  ``AbortError`` via ``raise ... from``; no-op if signal didn't fire.
* ``with guard.attach(stream):`` — register a listener that closes
  ``stream.response`` on abort; detach in ``__exit__``. ``__exit__``
  ALSO closes the response if ``signal.aborted`` is True at exit, as
  a safety net for the race window where ``AbortSignal._fire``
  snapshots the listener list, the consumer thread observes
  ``aborted=True``, breaks out, detaches the listener, and the
  snapshot's iteration then runs against an empty list — that path
  would otherwise leak the underlying httpx response open. The close
  is idempotent on httpx (``if not self.is_closed`` guard) so the
  double-fire path is safe.

Provider line counts:

  anthropic_provider.py    -88 lines
  minimax_provider.py      -73 lines
  openai_compatible.py    -212 lines
  _stream_abort.py        +211 (mostly docstring)

Net: providers shrink from 374 lines of abort bookkeeping to 110
(SDK-specific iteration shape only). The provider-level tests from
#144 and #145 (15 tests across three files) pass unmodified, proving
behavior preservation end-to-end.

Seventeen new unit tests in ``tests/test_stream_abort_guard.py`` pin
the helper's contract directly: pre/post fast-paths, ``aborted``
property semantics, ``attach`` lifecycle (register, detach, race-
recovery via register-then-recheck), close-failure tolerance, no-
response-attribute graceful degradation, ``reraise_if_aborted``
translation + cause chaining + no-op, and — critically — the
``__exit__`` close-on-abort safety net. The safety-net test
mutation-verified: removing the ``__exit__`` close branch makes the
test fail with the exact expected assertion.

The OpenAI-compat in-loop test gains a ``stream.response.close.called``
assertion to pin the close at the provider level too.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@ericleepi314 ericleepi314 merged commit 3b5a83f into fix/esc-cancel-openai-compatible May 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant